package com.stay4it.rxjava;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.stay4it.rxjava.bean.Student;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

/**
 * RxJava条件和布尔操作符 本节参考文章：http://blog.csdn.net/jdsjlzx/article/details/54767751
 * 
 * 
 * All Amb Contains DefaultIfEmpty SequenceEqual SkipUntil SkipWhile TakeUntil TakeWhile
 */
public class RxJava08 {

	static List<Student> studentList = new ArrayList<Student>() {
		{
			add(new Student("Stay", 28));
			add(new Student("谷歌小弟", 23));
			add(new Student("Star", 25));
		}
	};

	/**
	 * all 
	 * 用来判断 observable 中发射的所有数据是否都满足一个条件。
	 * 
	 */
	private static void test1() {
		Observable.from(studentList).all(new Func1<Student, Boolean>() {

			@Override
			public Boolean call(Student student) {
				return student.age > 25;
			}
		}).subscribe(new Action1<Boolean>() {

			@Override
			public void call(Boolean value) {
				System.out.println("onSuccess value = " + value);
			}
		});

	}

	/**
	 * amb 解释：对于给定两个或多个Observables，它只发射首先发射数据或通知的那个Observable的所有数据。
	 * 
	 * 当你传递多个Observable给amb操作符时，该操作符只发射其中一个Observable的数据和通知：
	 * 首先发送通知给amb操作符的的那个Observable，不管发射的是一项数据还是一个onError或onCompleted通知，
	 * amb将忽略和丢弃其它所有Observables的发射数据。
	 * 
	 * Javadoc: amb(T o1, T ... o2)（可接受2到9个参数）
	 */
	private static void test2() {
		Observable.amb(Observable.just(1,2,3,4,5).delay(2, TimeUnit.SECONDS),
				Observable.just(100,101))
				.subscribe(new Action1<Integer>() {

					@Override
					public void call(Integer value) {
						System.out.println("call value = " + value);
					}
				});

	}

	/**
	 * contains 
	 * 用来判断源Observable所发射的数据是否包含某一个数据，如果包含会返回true，如果源Observable已经结束了却还没有发射这个数据则返回false。
	 * 
	 * 注意：test3运行结果是false，原因是Student类中没有重写equals方法
	 * 
	 */
	private static void test3() {
		Observable.from(studentList).contains(new Student("Stay", 28)).subscribe(new Action1<Boolean>() {

			@Override
			public void call(Boolean value) {
				System.out.println("onSuccess value = " + value);
			}
		});

	}

	/**
	 * exists 解释：exists操作符类似与contains操作符，不同的是，其接受一个函数参数，在函数中，
	 * 对原Observable发射的数据，设定比对条件并做判断。若任何一项满足条件就创建并返回一
	 * 个发射true的Observable，否则返回一个发射false的Observable。
	 * 
	 */
	private static void test4() {
		Observable.from(studentList).exists(new Func1<Student, Boolean>() {

			@Override
			public Boolean call(Student student) {
				return student.age == 25;
			}
		}).subscribe(new Action1<Boolean>() {

			@Override
			public void call(Boolean value) {
				System.out.println("onSuccess value = " + value);
			}
		});

	}

	/**
	 * isEmpty 
	 * 解释：isEmpty操作符用于判定原始Observable是否没有发射任何数据。若原Observable未发射任何数据，
	 * 创建并返回一个发射true的Observable，否则返回一个发射false的Observable。
	 * 
	 */
	private static void test5() {
		Observable
		.from(studentList)
		.isEmpty()
		.subscribe(new Action1<Boolean>() {

			@Override
			public void call(Boolean value) {
				System.out.println("call value = " + value);
			}
		});

	}

	/**
	 * defaultIfEmpty 
	 * defaultIfEmpty操作接受一个备用数据，在原Observable没有发射任何数据正常终止（以onCompleted的形式），
	 * 该操作符以备用数据创建一个Observable并将数据发射出去。
	 * 
	 */
	private static void test6() {
		Observable.from(new ArrayList<Student>())
		.subscribeOn(Schedulers.trampoline())
		.defaultIfEmpty(new Student("will", 20))
		.subscribe(new Action1<Student>() {

					@Override
					public void call(Student t) {
						System.out.println("call value = " + t);

					}
				});

	}

	/**
	 * sequenceEqual 
	 * 解释：sequenceEqual操作符比较两个Observable发射的数据，
	 * 如果发射数据的序列是相同的（相同的数据，相同的顺序，相同的终止状态），则返回true，否则返回false
	 * 
	 * @throws InterruptedException 
	 * 
	 */
	private static void test7() throws InterruptedException {
		Observable
		.sequenceEqual(Observable.just(1, 2), Observable.just(1, 2).delay(1, TimeUnit.SECONDS))
				.subscribeOn(Schedulers.trampoline())
				.subscribe(new Action1<Boolean>() {
					@Override
					public void call(Boolean value) {
						System.out.println("call value = " + value);
					}
				});
		
		Thread.sleep(3000);

	}

	/**
	 * skipUntil 
	 * SkipUntil订阅原始的Observable，但是忽略它的发射物，直到第二个Observable发射了一项数据那一刻，它开始发射原始Observable。
	 * 
	 * @throws InterruptedException 
	 * 
	 */
	private static void test8() throws InterruptedException {
		Observable.interval(1, TimeUnit.SECONDS) //从0开始
        .take(6)
        .skipUntil(Observable.just(10).delay(3,TimeUnit.SECONDS))
	    .subscribe(new Subscriber<Long>() {
	        @Override
	        public void onCompleted() {

	        }

	        @Override
	        public void onError(Throwable e) {

	        }

	        @Override
	        public void onNext(Long value) {
	        	System.out.println("skipUntil value = " + value);
	            
	        }
	    });
		Thread.sleep(6000);
	}
	
	/**
	 * SkipWhile 
	 * SkipWhile订阅原始的Observable，但是忽略它的发射物，直到你指定的某个条件变为false的那一刻，它开始发射原始Observable。
	 * @throws InterruptedException 
	 * 
	 */
	private static void test9() throws InterruptedException {
		Observable.interval(1, TimeUnit.SECONDS) //从0开始
		.take(6)
		.skipWhile(new Func1<Long, Boolean>() {//谓词函数（有2个参数就是二元谓词函数）
            @Override
            public Boolean call(Long aLong) {
                return aLong<3;   //舍弃原Observable发射的数据，直到发射的数据>=3，才继续发射
            }
        })
		.subscribe(new Subscriber<Long>() {
			@Override
			public void onCompleted() {
				
			}
			
			@Override
			public void onError(Throwable e) {
				
			}
			
			@Override
			public void onNext(Long value) {
				System.out.println("skipWhile value = " + value);
				
			}
		});
		Thread.sleep(6000);
	}
	
	/**
	 * takeUntil 
	 * takeUntil操作符与skipUntil操作符作用相反，当第二个Observable发射了一项数据或者终止时，原Observable停止发射任何数据。
	 * 
	 * @throws InterruptedException 
	 * 
	 */
	private static void test10() throws InterruptedException {
		Observable.interval(500, TimeUnit.MILLISECONDS) //从0开始
        .takeUntil(Observable.just(100, 101, 102).delay(3, TimeUnit.SECONDS))
	    .subscribe(new Subscriber<Long>() {
	        @Override
	        public void onCompleted() {

	        }

	        @Override
	        public void onError(Throwable e) {

	        }

	        @Override
	        public void onNext(Long value) {
	        	System.out.println("takeUntil value = " + value);
	            
	        }
	    });
		Thread.sleep(6000);
	}
	
	/**
	 * TakeWhile 
	 * takeWhile操作符与skipWhile操作符作用相反，发射原始Observable，直到你指定的某个条件不成立的那一刻，它停止发射原始Observable，并终止自己的Observable。
	 * @throws InterruptedException 
	 * 
	 */
	private static void test11() throws InterruptedException {
		Observable.interval(1, TimeUnit.SECONDS) //从0开始
		.takeWhile(new Func1<Long, Boolean>() {
            @Override
            public Boolean call(Long aLong) {
                return aLong<3;   
            }
        })
		.subscribe(new Subscriber<Long>() {
			@Override
			public void onCompleted() {
				
			}
			
			@Override
			public void onError(Throwable e) {
				
			}
			
			@Override
			public void onNext(Long value) {
				System.out.println("takeWhile value = " + value);
				
			}
		});
		Thread.sleep(6000);
	}
	
	public static void main(String[] args) throws InterruptedException {
		test11();

	}

}
